// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
	"encoding/json"
	"sort"
	"strconv"
	"sync"
	"time"

	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/wrapperspb"
	extensions "istio.io/api/extensions/v1alpha1"
	"istio.io/api/networking/v1alpha3"
	"istio.io/istio/pkg/config"
	"istio.io/istio/pkg/config/schema/gvk"
	"istio.io/pkg/log"

	"github.com/alibaba/higress/v2/pkg/common"
	higressconfig "github.com/alibaba/higress/v2/pkg/config"
	ingress "github.com/alibaba/higress/v2/pkg/ingress/kube/common"
	"github.com/alibaba/higress/v2/registry"
)

type Cache interface {
	UpdateServiceWrapper(service string, data *ingress.ServiceWrapper)
	DeleteServiceWrapper(service string)
	UpdateProxyWrapper(name string, data *ingress.ProxyWrapper)
	DeleteProxyWrapper(name string)
	UpdateConfigCache(kind config.GroupVersionKind, key string, config *config.Config, forceDelete bool)
	GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config
	PurgeStaleItems() bool
	UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
	GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
	GetAllServiceEntry() []*v1alpha3.ServiceEntry
	GetAllServiceWrapper() []*ingress.ServiceWrapper
	GetAllProxyWrapper() []*ingress.ProxyWrapper
	GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule
	GetIncrementalServiceWrapper() (updatedList []*ingress.ServiceWrapper, deletedList []*ingress.ServiceWrapper)
	RemoveEndpointByIp(ip string)
}

func NewCache() Cache {
	return &store{
		mux:                    &sync.RWMutex{},
		configs:                make(map[string]map[string]*config.Config),
		sew:                    make(map[string]*ingress.ServiceWrapper),
		toBeUpdated:            make([]*ingress.ServiceWrapper, 0),
		toBeDeleted:            make([]*ingress.ServiceWrapper, 0),
		ip2services:            make(map[string]map[string]bool),
		deferredDeleteServices: make(map[string]struct{}),
		pw:                     make(map[string]*ingress.ProxyWrapper),
		deferredDeleteProxies:  make(map[string]struct{}),
	}
}

type store struct {
	mux *sync.RWMutex

	configs map[string]map[string]*config.Config

	sew                    map[string]*ingress.ServiceWrapper
	toBeUpdated            []*ingress.ServiceWrapper
	toBeDeleted            []*ingress.ServiceWrapper
	ip2services            map[string]map[string]bool
	deferredDeleteServices map[string]struct{}

	pw                    map[string]*ingress.ProxyWrapper
	deferredDeleteProxies map[string]struct{}
}

func (s *store) GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config {
	s.mux.Lock()
	defer s.mux.Unlock()
	cfgs, exist := s.configs[kind.String()]
	if !exist {
		return map[string]*config.Config{}
	}
	if kind == gvk.WasmPlugin {
		pluginConfig := &registry.WasmPluginConfig{}
		var ns string
		for _, cfg := range cfgs {
			ns = cfg.Namespace
			rule := cfg.Spec.(*registry.McpServerRule)
			pluginConfig.Rules = append(pluginConfig.Rules, rule)
		}
		if len(pluginConfig.Rules) == 0 {
			log.Infof("there is no mcp server rule exist, skip generate wasm plugin")
			return map[string]*config.Config{}
		}
		rulesBytes, err := json.Marshal(pluginConfig)
		if err != nil {
			log.Errorf("marshal mcp wasm plugin config error %v", err)
			return map[string]*config.Config{}
		}
		pbs := &structpb.Struct{}
		if err = protojson.Unmarshal(rulesBytes, pbs); err != nil {
			log.Errorf("unmarshal mcp wasm plugin config error %v", err)
			return map[string]*config.Config{}
		}
		wasmPlugin := &extensions.WasmPlugin{
			ImagePullPolicy: extensions.PullPolicy_Always,
			Phase:           extensions.PluginPhase_UNSPECIFIED_PHASE,
			Priority:        &wrapperspb.Int32Value{Value: 999},
			PluginConfig:    pbs,
			Url:             higressconfig.McpServerWasmImageUrl,
			FailStrategy:    extensions.FailStrategy_FAIL_OPEN,
		}

		return map[string]*config.Config{"wasm": {
			Meta: config.Meta{
				GroupVersionKind: gvk.WasmPlugin,
				Name:             "istio-autogenerated-mcp-wasmplugin",
				Namespace:        ns,
			},
			Spec: wasmPlugin,
		}}
	}

	return cfgs
}

func (s *store) UpdateConfigCache(kind config.GroupVersionKind, key string, cfg *config.Config, forceDelete bool) {
	if cfg == nil && !forceDelete {
		return
	}

	s.mux.Lock()
	if forceDelete {
		for _, allConfigs := range s.configs {
			delete(allConfigs, key)
		}
		log.Infof("Delete config %s in cache", key)
	} else {
		if _, exist := s.configs[kind.String()]; !exist {
			s.configs[kind.String()] = make(map[string]*config.Config)
		}

		if _, exist := s.configs[kind.String()][key]; exist {
			log.Infof("Update kind %s config %s", kind.String(), key)
		} else {
			log.Infof("Add kind %s config %s", kind.String(), key)
		}
		s.configs[kind.String()][key] = cfg
	}
	s.mux.Unlock()
}

func (s *store) UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) {
	s.mux.Lock()
	defer s.mux.Unlock()
	if se, exist := s.sew[service]; exist {
		idx := -1
		for i, ep := range se.ServiceEntry.Endpoints {
			if ep.Address == ip {
				idx = i
				if len(regionId) != 0 {
					ep.Locality = regionId
					if len(zoneId) != 0 {
						ep.Locality = regionId + "/" + zoneId
					}
				}
				if labels != nil {
					for k, v := range labels {
						if protocol == common.Dubbo.String() && k == "version" {
							ep.Labels["appversion"] = v
							continue
						}
						ep.Labels[k] = v
					}
				}

				if idx != -1 {
					se.ServiceEntry.Endpoints[idx] = ep
				}
				return
			}
		}
	}
	return
}

func (s *store) UpdateServiceWrapper(service string, data *ingress.ServiceWrapper) {
	s.mux.Lock()
	defer s.mux.Unlock()

	if old, exist := s.sew[service]; exist {
		data.SetCreateTime(old.GetCreateTime())
	} else {
		data.SetCreateTime(time.Now())
	}

	log.Debugf("mcp service entry update, name: %s, data: %v", service, data)

	s.toBeUpdated = append(s.toBeUpdated, data)
	s.sew[service] = data
	// service is updated, should not be deleted
	if _, ok := s.deferredDeleteServices[service]; ok {
		delete(s.deferredDeleteServices, service)
		log.Debugf("service in deferredDeleteServices updated, host: %s", service)
	}
	log.Infof("ServiceEntry updated, host: %s", service)
}

func (s *store) DeleteServiceWrapper(service string) {
	s.mux.Lock()
	defer s.mux.Unlock()

	if data, exist := s.sew[service]; exist {
		s.toBeDeleted = append(s.toBeDeleted, data)
		s.deferredDeleteServices[service] = struct{}{}
	}
}

func (s *store) UpdateProxyWrapper(name string, data *ingress.ProxyWrapper) {
	s.mux.Lock()
	defer s.mux.Unlock()

	if old, exist := s.pw[name]; exist {
		data.SetCreateTime(old.GetCreateTime())
	} else {
		data.SetCreateTime(time.Now())
	}

	log.Debugf("mcp proxy entry update, name: %s, data: %v", name, data)

	s.pw[name] = data
	// service is updated, should not be deleted
	if _, ok := s.deferredDeleteServices[name]; ok {
		delete(s.deferredDeleteServices, name)
		log.Debugf("proxy in deferredDeleteProxies updated, na: %s", name)
	}
	log.Infof("ProxyWrapper updated, name: %s", name)
}

func (s *store) DeleteProxyWrapper(name string) {
	s.mux.Lock()
	defer s.mux.Unlock()

	if _, exist := s.pw[name]; exist {
		s.deferredDeleteProxies[name] = struct{}{}
	}
}

// should only be called when reconcile is done
func (s *store) PurgeStaleItems() bool {
	s.mux.Lock()
	defer s.mux.Unlock()

	deleted := false

	for service := range s.deferredDeleteServices {
		delete(s.sew, service)
		delete(s.deferredDeleteServices, service)
		log.Infof("ServiceEntry deleted, host: %s", service)

		deleted = true
	}

	for proxy := range s.deferredDeleteProxies {
		delete(s.pw, proxy)
		delete(s.deferredDeleteProxies, proxy)
		log.Infof("ProxyWrapper deleted, name: %s", proxy)

		deleted = true
	}

	return deleted
}

// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints
// and the version of the service contained by the requestVersions. The result format is as below:
// key: serviceName + "#@" + suffix
// values: ["v1", "v2"] which has removed duplication
func (s *store) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string {
	s.mux.RLock()
	defer s.mux.RUnlock()

	result := make(map[string][]string)
	for _, serviceEntryWrapper := range s.sew {
		for _, workload := range serviceEntryWrapper.ServiceEntry.Endpoints {
			port, exist := workload.Ports[protocol.String()]
			if !exist {
				continue
			}

			endpoint := workload.Address + common.ColonSeparator + strconv.Itoa(int(port))
			if _, hit := endpoints[endpoint]; hit {
				if version, has := workload.Labels[versionKey]; has {
					if _, in := requestVersions[version]; in {
						key := serviceEntryWrapper.ServiceName + common.SpecialSeparator + serviceEntryWrapper.Suffix
						result[key] = append(result[key], version)
					}
				}
			}
		}
	}

	// remove duplication
	for key, versions := range result {
		sort.Strings(versions)
		i := 0
		for j := 1; j < len(versions); j++ {
			if versions[j] != versions[i] {
				i++
				versions[i] = versions[j]
			}
		}
		result[key] = versions[:i+1]
	}

	return result
}

// GetAllServiceEntry get all ServiceEntry in the store for xds push
func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry {
	s.mux.RLock()
	defer s.mux.RUnlock()

	seList := make([]*v1alpha3.ServiceEntry, 0)
	for _, serviceEntryWrapper := range s.sew {
		if len(serviceEntryWrapper.ServiceEntry.Hosts) == 0 {
			continue
		}
		seList = append(seList, serviceEntryWrapper.ServiceEntry.DeepCopy())
	}
	sort.Slice(seList, func(i, j int) bool {
		return seList[i].Hosts[0] > seList[j].Hosts[0]
	})
	return seList
}

// GetAllServiceWrapper get all ServiceWrapper in the store for xds push
func (s *store) GetAllServiceWrapper() []*ingress.ServiceWrapper {
	s.mux.RLock()
	defer s.mux.RUnlock()
	defer s.cleanUpdateAndDeleteArray()

	sewList := make([]*ingress.ServiceWrapper, 0)
	for _, serviceEntryWrapper := range s.sew {
		sewList = append(sewList, serviceEntryWrapper.DeepCopy())
	}
	return sewList
}

// GetAllProxyWrapper get all ServiceWrapper in the store for xds push
func (s *store) GetAllProxyWrapper() []*ingress.ProxyWrapper {
	s.mux.RLock()
	defer s.mux.RUnlock()

	pwList := make([]*ingress.ProxyWrapper, 0)
	for _, pw := range s.pw {
		pwList = append(pwList, pw.DeepCopy())
	}
	return pwList
}

// GetAllDestinationRuleWrapper get all DestinationRuleWrapper in the store for xds push
func (s *store) GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule {
	s.mux.RLock()
	defer s.mux.RUnlock()
	defer s.cleanUpdateAndDeleteArray()

	drwList := make([]*ingress.WrapperDestinationRule, 0)
	for _, serviceEntryWrapper := range s.sew {
		if serviceEntryWrapper.DestinationRuleWrapper != nil {
			drwList = append(drwList, serviceEntryWrapper.DeepCopy().DestinationRuleWrapper)
		}
	}
	configFromMcp := s.configs[gvk.DestinationRule.String()]
	for _, cfg := range configFromMcp {
		dr := cfg.Spec.(*v1alpha3.DestinationRule)
		drwList = append(drwList, &ingress.WrapperDestinationRule{
			DestinationRule: dr,
			ServiceKey:      ingress.ServiceKey{Namespace: "mcp", Name: dr.Host, ServiceFQDN: dr.Host},
		})
	}

	return drwList
}

// GetIncrementalServiceWrapper get incremental ServiceWrapper in the store for xds push
func (s *store) GetIncrementalServiceWrapper() ([]*ingress.ServiceWrapper, []*ingress.ServiceWrapper) {
	s.mux.RLock()
	defer s.mux.RUnlock()
	defer s.cleanUpdateAndDeleteArray()

	updatedList := make([]*ingress.ServiceWrapper, 0)
	for _, serviceEntryWrapper := range s.toBeUpdated {
		updatedList = append(updatedList, serviceEntryWrapper.DeepCopy())
	}

	deletedList := make([]*ingress.ServiceWrapper, 0)
	for _, serviceEntryWrapper := range s.toBeDeleted {
		deletedList = append(deletedList, serviceEntryWrapper.DeepCopy())
	}

	return updatedList, deletedList
}

func (s *store) cleanUpdateAndDeleteArray() {
	s.toBeUpdated = nil
	s.toBeDeleted = nil
}

func (s *store) updateIpMap(service string, data *ingress.ServiceWrapper) {
	for _, ep := range data.ServiceEntry.Endpoints {
		if s.ip2services[ep.Address] == nil {
			s.ip2services[ep.Address] = make(map[string]bool)
		}
		s.ip2services[ep.Address][service] = true
	}
}

func (s *store) RemoveEndpointByIp(ip string) {
	s.mux.Lock()
	defer s.mux.Unlock()

	services, has := s.ip2services[ip]
	if !has {
		return
	}
	delete(s.ip2services, ip)

	for service := range services {
		if data, exist := s.sew[service]; exist {
			idx := -1
			for i, ep := range data.ServiceEntry.Endpoints {
				if ep.Address == ip {
					idx = i
					break
				}
			}
			if idx != -1 {
				data.ServiceEntry.Endpoints = append(data.ServiceEntry.Endpoints[:idx], data.ServiceEntry.Endpoints[idx+1:]...)
			}
			s.toBeUpdated = append(s.toBeUpdated, data)
		}
	}
}
